/* GStreamer
 * Copyright (C) 2007 David Schleef <ds@schleef.org>
 *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
 */
/**
 * SECTION:gstappsrc
 * @title: GstAppSrc
 * @short_description: Easy way for applications to inject buffers into a
 *     pipeline
 * @see_also: #GstBaseSrc, appsink
 *
 * The appsrc element can be used by applications to insert data into a
 * GStreamer pipeline. Unlike most GStreamer elements, appsrc provides
 * external API functions.
 *
 * appsrc can be used by linking with the libgstapp library to access the
 * methods directly or by using the appsrc action signals.
 *
 * Before operating appsrc, the caps property must be set to fixed caps
 * describing the format of the data that will be pushed with appsrc. An
 * exception to this is when pushing buffers with unknown caps, in which case no
 * caps should be set. This is typically true of file-like sources that push raw
 * byte buffers. If you don't want to explicitly set the caps, you can use
 * gst_app_src_push_sample. This method gets the caps associated with the
 * sample and sets them on the appsrc replacing any previously set caps (if
 * different from sample's caps).
 *
 * The main way of handing data to the appsrc element is by calling the
 * gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
 * This will put the buffer onto a queue from which appsrc will read from in its
 * streaming thread. It is important to note that data transport will not happen
 * from the thread that performed the push-buffer call.
 *
 * The "max-bytes", "max-buffers" and "max-time" properties control how much
 * data can be queued in appsrc before appsrc considers the queue full. A
 * filled internal queue will always signal the "enough-data" signal, which
 * signals the application that it should stop pushing data into appsrc. The
 * "block" property will cause appsrc to block the push-buffer method until
 * free data becomes available again.
 *
 * When the internal queue is running out of data, the "need-data" signal is
 * emitted, which signals the application that it should start pushing more data
 * into appsrc.
 *
 * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
 * "seek-data" signal when the "stream-mode" property is set to "seekable" or
 * "random-access". The signal argument will contain the new desired position in
 * the stream expressed in the unit set with the "format" property. After
 * receiving the seek-data signal, the application should push-buffers from the
 * new position.
 *
 * These signals allow the application to operate the appsrc in two different
 * ways:
 *
 * The push mode, in which the application repeatedly calls the push-buffer/push-sample
 * method with a new buffer/sample. Optionally, the queue size in the appsrc
 * can be controlled with the enough-data and need-data signals by respectively
 * stopping/starting the push-buffer/push-sample calls. This is a typical
 * mode of operation for the stream-type "stream" and "seekable". Use this
 * mode when implementing various network protocols or hardware devices.
 *
 * The pull mode, in which the need-data signal triggers the next push-buffer call.
 * This mode is typically used in the "random-access" stream-type. Use this
 * mode for file access or other randomly accessible sources. In this mode, a
 * buffer of exactly the amount of bytes given by the need-data signal should be
 * pushed into appsrc.
 *
 * In all modes, the size property on appsrc should contain the total stream
 * size in bytes. Setting this property is mandatory in the random-access mode.
 * For the stream and seekable modes, setting this property is optional but
 * recommended.
 *
 * When the application has finished pushing data into appsrc, it should call
 * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
 * this call, no more buffers can be pushed into appsrc until a flushing seek
 * occurs or the state of the appsrc has gone through READY.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gst/gst.h>
#include <gst/base/base.h>

#include <string.h>

#include "gstappsrc.h"

typedef enum
{
  NOONE_WAITING = 0,
  STREAM_WAITING = 1 << 0,      /* streaming thread is waiting for application thread */
  APP_WAITING = 1 << 1,         /* application thread is waiting for streaming thread */
} GstAppSrcWaitStatus;

typedef struct
{
  GstAppSrcCallbacks callbacks;
  gpointer user_data;
  GDestroyNotify destroy_notify;
  gint ref_count;
} Callbacks;

static Callbacks *
callbacks_ref (Callbacks * callbacks)
{
  g_atomic_int_inc (&callbacks->ref_count);

  return callbacks;
}

static void
callbacks_unref (Callbacks * callbacks)
{
  if (!g_atomic_int_dec_and_test (&callbacks->ref_count))
    return;

  if (callbacks->destroy_notify)
    callbacks->destroy_notify (callbacks->user_data);

  g_free (callbacks);
}


struct _GstAppSrcPrivate
{
  GCond cond;
  GMutex mutex;
  GstQueueArray *queue;
  GstAppSrcWaitStatus wait_status;

  GstCaps *last_caps;
  GstCaps *current_caps;
  /* last segment received on the input */
  GstSegment last_segment;
  /* currently configured segment for the output */
  GstSegment current_segment;
  /* queue up a segment event based on last_segment before
   * the next buffer of buffer list */
  gboolean pending_custom_segment;

  /* the next buffer that will be queued needs a discont flag
   * because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */
  gboolean need_discont_upstream;
  /* the next buffer that will be dequeued needs a discont flag
   * because the previous one was dropped - GST_APP_LEAKY_TYPE_DOWNSTREAM */
  gboolean need_discont_downstream;

  gint64 size;
  GstClockTime duration;
  GstAppStreamType stream_type;
  guint64 max_bytes, max_buffers, max_time;
  GstFormat format;
  gboolean block;
  gchar *uri;

  gboolean flushing;
  gboolean started;
  gboolean is_eos;
  guint64 queued_bytes, queued_buffers;
  /* Used to calculate the current time level */
  GstClockTime last_in_running_time, last_out_running_time;
  /* Updated based on the above whenever they change */
  GstClockTime queued_time;
  guint64 offset;
  GstAppStreamType current_type;

  guint64 min_latency;
  guint64 max_latency;
  gboolean emit_signals;
  guint min_percent;
  gboolean handle_segment_change;

  GstAppLeakyType leaky_type;

  Callbacks *callbacks;
};

GST_DEBUG_CATEGORY_STATIC (app_src_debug);
#define GST_CAT_DEFAULT app_src_debug

enum
{
  /* signals */
  SIGNAL_NEED_DATA,
  SIGNAL_ENOUGH_DATA,
  SIGNAL_SEEK_DATA,

  /* actions */
  SIGNAL_PUSH_BUFFER,
  SIGNAL_END_OF_STREAM,
  SIGNAL_PUSH_SAMPLE,
  SIGNAL_PUSH_BUFFER_LIST,

  LAST_SIGNAL
};

#define DEFAULT_PROP_SIZE          -1
#define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
#define DEFAULT_PROP_MAX_BYTES     200000
#define DEFAULT_PROP_MAX_BUFFERS   0
#define DEFAULT_PROP_MAX_TIME      (0 * GST_SECOND)
#define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
#define DEFAULT_PROP_BLOCK         FALSE
#define DEFAULT_PROP_IS_LIVE       FALSE
#define DEFAULT_PROP_MIN_LATENCY   -1
#define DEFAULT_PROP_MAX_LATENCY   -1
#define DEFAULT_PROP_EMIT_SIGNALS  TRUE
#define DEFAULT_PROP_MIN_PERCENT   0
#define DEFAULT_PROP_CURRENT_LEVEL_BYTES   0
#define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0
#define DEFAULT_PROP_CURRENT_LEVEL_TIME    0
#define DEFAULT_PROP_DURATION      GST_CLOCK_TIME_NONE
#define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
#define DEFAULT_PROP_LEAKY_TYPE    GST_APP_LEAKY_TYPE_NONE

enum
{
  PROP_0,
  PROP_CAPS,
  PROP_SIZE,
  PROP_STREAM_TYPE,
  PROP_MAX_BYTES,
  PROP_MAX_BUFFERS,
  PROP_MAX_TIME,
  PROP_FORMAT,
  PROP_BLOCK,
  PROP_IS_LIVE,
  PROP_MIN_LATENCY,
  PROP_MAX_LATENCY,
  PROP_EMIT_SIGNALS,
  PROP_MIN_PERCENT,
  PROP_CURRENT_LEVEL_BYTES,
  PROP_CURRENT_LEVEL_BUFFERS,
  PROP_CURRENT_LEVEL_TIME,
  PROP_DURATION,
  PROP_HANDLE_SEGMENT_CHANGE,
  PROP_LEAKY_TYPE,
  PROP_LAST
};

static GstStaticPadTemplate gst_app_src_template =
GST_STATIC_PAD_TEMPLATE ("src",
    GST_PAD_SRC,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

static void gst_app_src_uri_handler_init (gpointer g_iface,
    gpointer iface_data);

static void gst_app_src_dispose (GObject * object);
static void gst_app_src_finalize (GObject * object);

static void gst_app_src_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec);
static void gst_app_src_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec);

static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event);

static void gst_app_src_set_latencies (GstAppSrc * appsrc,
    gboolean do_min, guint64 min, gboolean do_max, guint64 max);

static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc);
static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc,
    GstCaps * filter);
static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset,
    guint size, GstBuffer ** buf);
static gboolean gst_app_src_start (GstBaseSrc * bsrc);
static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event);

static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
    GstBuffer * buffer);
static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
    GstBufferList * buffer_list);
static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc,
    GstSample * sample);

static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };

#define gst_app_src_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
    G_ADD_PRIVATE (GstAppSrc)
    G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));

static void
gst_app_src_class_init (GstAppSrcClass * klass)
{
  GObjectClass *gobject_class = (GObjectClass *) klass;
  GstElementClass *element_class = (GstElementClass *) klass;
  GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;

  GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");

  gobject_class->dispose = gst_app_src_dispose;
  gobject_class->finalize = gst_app_src_finalize;

  gobject_class->set_property = gst_app_src_set_property;
  gobject_class->get_property = gst_app_src_get_property;

  /**
   * GstAppSrc:caps:
   *
   * The GstCaps that will negotiated downstream and will be put
   * on outgoing buffers.
   */
  g_object_class_install_property (gobject_class, PROP_CAPS,
      g_param_spec_boxed ("caps", "Caps",
          "The allowed caps for the src pad", GST_TYPE_CAPS,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  /**
   * GstAppSrc:format:
   *
   * The format to use for segment events. When the source is producing
   * timestamped buffers this property should be set to GST_FORMAT_TIME.
   */
  g_object_class_install_property (gobject_class, PROP_FORMAT,
      g_param_spec_enum ("format", "Format",
          "The format of the segment events and seek", GST_TYPE_FORMAT,
          DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  /**
   * GstAppSrc:size:
   *
   * The total size in bytes of the data stream. If the total size is known, it
   * is recommended to configure it with this property.
   */
  g_object_class_install_property (gobject_class, PROP_SIZE,
      g_param_spec_int64 ("size", "Size",
          "The size of the data stream in bytes (-1 if unknown)",
          -1, G_MAXINT64, DEFAULT_PROP_SIZE,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  /**
   * GstAppSrc:stream-type:
   *
   * The type of stream that this source is producing.  For seekable streams the
   * application should connect to the seek-data signal.
   */
  g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
      g_param_spec_enum ("stream-type", "Stream Type",
          "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
          DEFAULT_PROP_STREAM_TYPE,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  /**
   * GstAppSrc:max-bytes:
   *
   * The maximum amount of bytes that can be queued internally.
   * After the maximum amount of bytes are queued, appsrc will emit the
   * "enough-data" signal.
   */
  g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
      g_param_spec_uint64 ("max-bytes", "Max bytes",
          "The maximum number of bytes to queue internally (0 = unlimited)",
          0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  /**
   * GstAppSrc:max-buffers:
   *
   * The maximum amount of buffers that can be queued internally.
   * After the maximum amount of buffers are queued, appsrc will emit the
   * "enough-data" signal.
   *
   * Since: 1.20
   */
  g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
      g_param_spec_uint64 ("max-buffers", "Max buffers",
          "The maximum number of buffers to queue internally (0 = unlimited)",
          0, G_MAXUINT64, DEFAULT_PROP_MAX_BUFFERS,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  /**
   * GstAppSrc:max-time:
   *
   * The maximum amount of time that can be queued internally.
   * After the maximum amount of time are queued, appsrc will emit the
   * "enough-data" signal.
   *
   * Since: 1.20
   */
  g_object_class_install_property (gobject_class, PROP_MAX_TIME,
      g_param_spec_uint64 ("max-time", "Max time",
          "The maximum amount of time to queue internally (0 = unlimited)",
          0, G_MAXUINT64, DEFAULT_PROP_MAX_TIME,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  /**
   * GstAppSrc:block:
   *
   * When max-bytes are queued and after the enough-data signal has been emitted,
   * block any further push-buffer calls until the amount of queued bytes drops
   * below the max-bytes limit.
   */
  g_object_class_install_property (gobject_class, PROP_BLOCK,
      g_param_spec_boolean ("block", "Block",
          "Block push-buffer when max-bytes are queued",
          DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  /**
   * GstAppSrc:is-live:
   *
   * Instruct the source to behave like a live source. This includes that it
   * will only push out buffers in the PLAYING state.
   */
  g_object_class_install_property (gobject_class, PROP_IS_LIVE,
      g_param_spec_boolean ("is-live", "Is Live",
          "Whether to act as a live source",
          DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  /**
   * GstAppSrc:min-latency:
   *
   * The minimum latency of the source. A value of -1 will use the default
   * latency calculations of #GstBaseSrc.
   */
  g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
      g_param_spec_int64 ("min-latency", "Min Latency",
          "The minimum latency (-1 = default)",
          -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  /**
   * GstAppSrc::max-latency:
   *
   * The maximum latency of the source. A value of -1 means an unlimited amount
   * of latency.
   */
  g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
      g_param_spec_int64 ("max-latency", "Max Latency",
          "The maximum latency (-1 = unlimited)",
          -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  /**
   * GstAppSrc:emit-signals:
   *
   * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
   * This option is by default enabled for backwards compatibility reasons but
   * can disabled when needed because signal emission is expensive.
   */
  g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
      g_param_spec_boolean ("emit-signals", "Emit signals",
          "Emit need-data, enough-data and seek-data signals",
          DEFAULT_PROP_EMIT_SIGNALS,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  /**
   * GstAppSrc:min-percent:
   *
   * Make appsrc emit the "need-data" signal when the amount of bytes in the
   * queue drops below this percentage of max-bytes.
   */
  g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
      g_param_spec_uint ("min-percent", "Min Percent",
          "Emit need-data when queued bytes drops below this percent of max-bytes",
          0, 100, DEFAULT_PROP_MIN_PERCENT,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  /**
   * GstAppSrc:current-level-bytes:
   *
   * The number of currently queued bytes inside appsrc.
   *
   * Since: 1.2
   */
  g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BYTES,
      g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes",
          "The number of currently queued bytes",
          0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

  /**
   * GstAppSrc:current-level-buffers:
   *
   * The number of currently queued buffers inside appsrc.
   *
   * Since: 1.20
   */
  g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BUFFERS,
      g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers",
          "The number of currently queued buffers",
          0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BUFFERS,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

  /**
   * GstAppSrc:current-level-time:
   *
   * The amount of currently queued time inside appsrc.
   *
   * Since: 1.20
   */
  g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME,
      g_param_spec_uint64 ("current-level-time", "Current Level Time",
          "The amount of currently queued time",
          0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_TIME,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

  /**
   * GstAppSrc:duration:
   *
   * The total duration in nanoseconds of the data stream. If the total duration is known, it
   * is recommended to configure it with this property.
   *
   * Since: 1.10
   */
  g_object_class_install_property (gobject_class, PROP_DURATION,
      g_param_spec_uint64 ("duration", "Duration",
          "The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)",
          0, G_MAXUINT64, DEFAULT_PROP_DURATION,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  /**
   * GstAppSrc:handle-segment-change:
   *
   * When enabled, appsrc will check GstSegment in GstSample which was
   * pushed via gst_app_src_push_sample() or "push-sample" signal action.
   * If a GstSegment is changed, corresponding segment event will be followed
   * by next data flow.
   *
   * FIXME: currently only GST_FORMAT_TIME format is supported and therefore
   * GstAppSrc::format should be time. However, possibly #GstAppSrc can support
   * other formats.
   *
   * Since: 1.18
   */
  g_object_class_install_property (gobject_class, PROP_HANDLE_SEGMENT_CHANGE,
      g_param_spec_boolean ("handle-segment-change", "Handle Segment Change",
          "Whether to detect and handle changed time format GstSegment in "
          "GstSample. User should set valid GstSegment in GstSample. "
          "Must set format property as \"time\" to enable this property",
          DEFAULT_PROP_HANDLE_SEGMENT_CHANGE,
          G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
          G_PARAM_STATIC_STRINGS));

  /**
   * GstAppSrc:leaky-type:
   *
   * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
   * will drop any buffers that are pushed into it once its internal queue is
   * full. The selected type defines whether to drop the oldest or new
   * buffers.
   *
   * Since: 1.20
   */
  g_object_class_install_property (gobject_class, PROP_LEAKY_TYPE,
      g_param_spec_enum ("leaky-type", "Leaky Type",
          "Whether to drop buffers once the internal queue is full",
          GST_TYPE_APP_LEAKY_TYPE,
          DEFAULT_PROP_LEAKY_TYPE,
          G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
          G_PARAM_STATIC_STRINGS));

  /**
   * GstAppSrc::need-data:
   * @appsrc: the appsrc element that emitted the signal
   * @length: the amount of bytes needed.
   *
   * Signal that the source needs more data. In the callback or from another
   * thread you should call push-buffer or end-of-stream.
   *
   * @length is just a hint and when it is set to -1, any number of bytes can be
   * pushed into @appsrc.
   *
   * You can call push-buffer multiple times until the enough-data signal is
   * fired.
   */
  gst_app_src_signals[SIGNAL_NEED_DATA] =
      g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
      G_STRUCT_OFFSET (GstAppSrcClass, need_data),
      NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);

  /**
   * GstAppSrc::enough-data:
   * @appsrc: the appsrc element that emitted the signal
   *
   * Signal that the source has enough data. It is recommended that the
   * application stops calling push-buffer until the need-data signal is
   * emitted again to avoid excessive buffer queueing.
   */
  gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
      g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
      G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
      NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);

  /**
   * GstAppSrc::seek-data:
   * @appsrc: the appsrc element that emitted the signal
   * @offset: the offset to seek to
   *
   * Seek to the given offset. The next push-buffer should produce buffers from
   * the new @offset.
   * This callback is only called for seekable stream types.
   *
   * Returns: %TRUE if the seek succeeded.
   */
  gst_app_src_signals[SIGNAL_SEEK_DATA] =
      g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
      G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
      NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);

   /**
    * GstAppSrc::push-buffer:
    * @appsrc: the appsrc
    * @buffer: (transfer none): a buffer to push
    *
    * Adds a buffer to the queue of buffers that the appsrc element will
    * push to its source pad.
    *
    * This function does not take ownership of the buffer, but it takes a
    * reference so the buffer can be unreffed at any time after calling this
    * function.
    *
    * When the block property is TRUE, this function can block until free space
    * becomes available in the queue.
    */
  gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
      g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
          push_buffer), NULL, NULL, NULL,
      GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);

   /**
    * GstAppSrc::push-buffer-list:
    * @appsrc: the appsrc
    * @buffer_list: (transfer none): a buffer list to push
    *
    * Adds a buffer list to the queue of buffers and buffer lists that the
    * appsrc element will push to its source pad.
    *
    * This function does not take ownership of the buffer list, but it takes a
    * reference so the buffer list can be unreffed at any time after calling
    * this function.
    *
    * When the block property is TRUE, this function can block until free space
    * becomes available in the queue.
    *
    * Since: 1.14
    */
  gst_app_src_signals[SIGNAL_PUSH_BUFFER_LIST] =
      g_signal_new ("push-buffer-list", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
          push_buffer_list), NULL, NULL, NULL,
      GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER_LIST);

  /**
    * GstAppSrc::push-sample:
    * @appsrc: the appsrc
    * @sample: (transfer none): a sample from which extract buffer to push
    *
    * Extract a buffer from the provided sample and adds the extracted buffer
    * to the queue of buffers that the appsrc element will
    * push to its source pad. This function set the appsrc caps based on the caps
    * in the sample and reset the caps if they change.
    * Only the caps and the buffer of the provided sample are used and not
    * for example the segment in the sample.
    *
    * This function does not take ownership of the sample, but it takes a
    * reference so the sample can be unreffed at any time after calling this
    * function.
    *
    * When the block property is TRUE, this function can block until free space
    * becomes available in the queue.
    *
    * Since: 1.6
    */
  gst_app_src_signals[SIGNAL_PUSH_SAMPLE] =
      g_signal_new ("push-sample", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
          push_sample), NULL, NULL, NULL,
      GST_TYPE_FLOW_RETURN, 1, GST_TYPE_SAMPLE);


   /**
    * GstAppSrc::end-of-stream:
    * @appsrc: the appsrc
    *
    * Notify @appsrc that no more buffer are available.
    */
  gst_app_src_signals[SIGNAL_END_OF_STREAM] =
      g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
          end_of_stream), NULL, NULL, NULL,
      GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);

  gst_element_class_set_static_metadata (element_class, "AppSrc",
      "Generic/Source", "Allow the application to feed buffers to a pipeline",
      "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");

  gst_element_class_add_static_pad_template (element_class,
      &gst_app_src_template);

  element_class->send_event = gst_app_src_send_event;

  basesrc_class->negotiate = gst_app_src_negotiate;
  basesrc_class->get_caps = gst_app_src_internal_get_caps;
  basesrc_class->create = gst_app_src_create;
  basesrc_class->start = gst_app_src_start;
  basesrc_class->stop = gst_app_src_stop;
  basesrc_class->unlock = gst_app_src_unlock;
  basesrc_class->unlock_stop = gst_app_src_unlock_stop;
  basesrc_class->do_seek = gst_app_src_do_seek;
  basesrc_class->is_seekable = gst_app_src_is_seekable;
  basesrc_class->get_size = gst_app_src_do_get_size;
  basesrc_class->query = gst_app_src_query;
  basesrc_class->event = gst_app_src_event;

  klass->push_buffer = gst_app_src_push_buffer_action;
  klass->push_buffer_list = gst_app_src_push_buffer_list_action;
  klass->push_sample = gst_app_src_push_sample_action;
  klass->end_of_stream = gst_app_src_end_of_stream;
}

static void
gst_app_src_init (GstAppSrc * appsrc)
{
  GstAppSrcPrivate *priv;

  priv = appsrc->priv = gst_app_src_get_instance_private (appsrc);

  g_mutex_init (&priv->mutex);
  g_cond_init (&priv->cond);
  priv->queue = gst_queue_array_new (16);
  priv->wait_status = NOONE_WAITING;

  priv->size = DEFAULT_PROP_SIZE;
  priv->duration = DEFAULT_PROP_DURATION;
  priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
  priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
  priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
  priv->max_time = DEFAULT_PROP_MAX_TIME;
  priv->format = DEFAULT_PROP_FORMAT;
  priv->block = DEFAULT_PROP_BLOCK;
  priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
  priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
  priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
  priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
  priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE;
  priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE;

  gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
}

/* Must be called with priv->mutex */
static void
gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
{
  GstMiniObject *obj;
  GstAppSrcPrivate *priv = src->priv;
  GstCaps *requeue_caps = NULL;

  while (!gst_queue_array_is_empty (priv->queue)) {
    obj = gst_queue_array_pop_head (priv->queue);
    if (obj) {
      if (GST_IS_CAPS (obj) && retain_last_caps) {
        gst_caps_replace (&requeue_caps, GST_CAPS_CAST (obj));
      }
      gst_mini_object_unref (obj);
    }
  }

  if (requeue_caps) {
    gst_queue_array_push_tail (priv->queue, requeue_caps);
  }

  priv->queued_bytes = 0;
  priv->queued_buffers = 0;
  priv->queued_time = 0;
  priv->last_in_running_time = GST_CLOCK_TIME_NONE;
  priv->last_out_running_time = GST_CLOCK_TIME_NONE;
  priv->need_discont_upstream = FALSE;
  priv->need_discont_downstream = FALSE;
}

static void
gst_app_src_dispose (GObject * obj)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
  GstAppSrcPrivate *priv = appsrc->priv;
  Callbacks *callbacks = NULL;

  GST_OBJECT_LOCK (appsrc);
  if (priv->current_caps) {
    gst_caps_unref (priv->current_caps);
    priv->current_caps = NULL;
  }
  if (priv->last_caps) {
    gst_caps_unref (priv->last_caps);
    priv->last_caps = NULL;
  }
  GST_OBJECT_UNLOCK (appsrc);

  g_mutex_lock (&priv->mutex);
  if (priv->callbacks)
    callbacks = g_steal_pointer (&priv->callbacks);
  gst_app_src_flush_queued (appsrc, FALSE);
  g_mutex_unlock (&priv->mutex);

  g_clear_pointer (&callbacks, callbacks_unref);

  G_OBJECT_CLASS (parent_class)->dispose (obj);
}

static void
gst_app_src_finalize (GObject * obj)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
  GstAppSrcPrivate *priv = appsrc->priv;

  g_mutex_clear (&priv->mutex);
  g_cond_clear (&priv->cond);
  gst_queue_array_free (priv->queue);

  g_free (priv->uri);

  G_OBJECT_CLASS (parent_class)->finalize (obj);
}

static GstCaps *
gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
{
  GstAppSrc *appsrc = GST_APP_SRC (bsrc);
  GstCaps *caps;

  GST_OBJECT_LOCK (appsrc);
  if ((caps = appsrc->priv->current_caps))
    gst_caps_ref (caps);
  GST_OBJECT_UNLOCK (appsrc);

  if (filter) {
    if (caps) {
      GstCaps *intersection =
          gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
      gst_caps_unref (caps);
      caps = intersection;
    } else {
      caps = gst_caps_ref (filter);
    }
  }

  GST_DEBUG_OBJECT (appsrc, "caps: %" GST_PTR_FORMAT, caps);
  return caps;
}

static void
gst_app_src_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
  GstAppSrcPrivate *priv = appsrc->priv;

  switch (prop_id) {
    case PROP_CAPS:
      gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
      break;
    case PROP_SIZE:
      gst_app_src_set_size (appsrc, g_value_get_int64 (value));
      break;
    case PROP_STREAM_TYPE:
      gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
      break;
    case PROP_MAX_BYTES:
      gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
      break;
    case PROP_MAX_BUFFERS:
      gst_app_src_set_max_buffers (appsrc, g_value_get_uint64 (value));
      break;
    case PROP_MAX_TIME:
      gst_app_src_set_max_time (appsrc, g_value_get_uint64 (value));
      break;
    case PROP_FORMAT:
      priv->format = g_value_get_enum (value);
      break;
    case PROP_BLOCK:
      priv->block = g_value_get_boolean (value);
      break;
    case PROP_IS_LIVE:
      gst_base_src_set_live (GST_BASE_SRC (appsrc),
          g_value_get_boolean (value));
      break;
    case PROP_MIN_LATENCY:
      gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
          FALSE, -1);
      break;
    case PROP_MAX_LATENCY:
      gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
          g_value_get_int64 (value));
      break;
    case PROP_EMIT_SIGNALS:
      gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
      break;
    case PROP_MIN_PERCENT:
      priv->min_percent = g_value_get_uint (value);
      break;
    case PROP_DURATION:
      gst_app_src_set_duration (appsrc, g_value_get_uint64 (value));
      break;
    case PROP_HANDLE_SEGMENT_CHANGE:
      priv->handle_segment_change = g_value_get_boolean (value);
      break;
    case PROP_LEAKY_TYPE:
      priv->leaky_type = g_value_get_enum (value);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
    GParamSpec * pspec)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
  GstAppSrcPrivate *priv = appsrc->priv;

  switch (prop_id) {
    case PROP_CAPS:
      g_value_take_boxed (value, gst_app_src_get_caps (appsrc));
      break;
    case PROP_SIZE:
      g_value_set_int64 (value, gst_app_src_get_size (appsrc));
      break;
    case PROP_STREAM_TYPE:
      g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
      break;
    case PROP_MAX_BYTES:
      g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
      break;
    case PROP_MAX_BUFFERS:
      g_value_set_uint64 (value, gst_app_src_get_max_buffers (appsrc));
      break;
    case PROP_MAX_TIME:
      g_value_set_uint64 (value, gst_app_src_get_max_time (appsrc));
      break;
    case PROP_FORMAT:
      g_value_set_enum (value, priv->format);
      break;
    case PROP_BLOCK:
      g_value_set_boolean (value, priv->block);
      break;
    case PROP_IS_LIVE:
      g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
      break;
    case PROP_MIN_LATENCY:
    {
      guint64 min = 0;

      gst_app_src_get_latency (appsrc, &min, NULL);
      g_value_set_int64 (value, min);
      break;
    }
    case PROP_MAX_LATENCY:
    {
      guint64 max = 0;

      gst_app_src_get_latency (appsrc, NULL, &max);
      g_value_set_int64 (value, max);
      break;
    }
    case PROP_EMIT_SIGNALS:
      g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
      break;
    case PROP_MIN_PERCENT:
      g_value_set_uint (value, priv->min_percent);
      break;
    case PROP_CURRENT_LEVEL_BYTES:
      g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc));
      break;
    case PROP_CURRENT_LEVEL_BUFFERS:
      g_value_set_uint64 (value,
          gst_app_src_get_current_level_buffers (appsrc));
      break;
    case PROP_CURRENT_LEVEL_TIME:
      g_value_set_uint64 (value, gst_app_src_get_current_level_time (appsrc));
      break;
    case PROP_DURATION:
      g_value_set_uint64 (value, gst_app_src_get_duration (appsrc));
      break;
    case PROP_HANDLE_SEGMENT_CHANGE:
      g_value_set_boolean (value, priv->handle_segment_change);
      break;
    case PROP_LEAKY_TYPE:
      g_value_set_enum (value, priv->leaky_type);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static gboolean
gst_app_src_send_event (GstElement * element, GstEvent * event)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (element);
  GstAppSrcPrivate *priv = appsrc->priv;

  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_FLUSH_STOP:
      g_mutex_lock (&priv->mutex);
      gst_app_src_flush_queued (appsrc, TRUE);
      g_mutex_unlock (&priv->mutex);
      break;
    default:
      if (GST_EVENT_IS_SERIALIZED (event)) {
        GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event);
        g_mutex_lock (&priv->mutex);
        gst_queue_array_push_tail (priv->queue, event);

        if ((priv->wait_status & STREAM_WAITING))
          g_cond_broadcast (&priv->cond);

        g_mutex_unlock (&priv->mutex);
        return TRUE;
      }
      break;
  }

  return GST_CALL_PARENT_WITH_DEFAULT (GST_ELEMENT_CLASS, send_event, (element,
          event), FALSE);
}

static gboolean
gst_app_src_unlock (GstBaseSrc * bsrc)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
  GstAppSrcPrivate *priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  GST_DEBUG_OBJECT (appsrc, "unlock start");
  priv->flushing = TRUE;
  g_cond_broadcast (&priv->cond);
  g_mutex_unlock (&priv->mutex);

  return TRUE;
}

static gboolean
gst_app_src_unlock_stop (GstBaseSrc * bsrc)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
  GstAppSrcPrivate *priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  GST_DEBUG_OBJECT (appsrc, "unlock stop");
  priv->flushing = FALSE;
  g_cond_broadcast (&priv->cond);
  g_mutex_unlock (&priv->mutex);

  return TRUE;
}

static gboolean
gst_app_src_start (GstBaseSrc * bsrc)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
  GstAppSrcPrivate *priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  GST_DEBUG_OBJECT (appsrc, "starting");
  priv->started = TRUE;
  /* set the offset to -1 so that we always do a first seek. This is only used
   * in random-access mode. */
  priv->offset = -1;
  priv->flushing = FALSE;
  g_mutex_unlock (&priv->mutex);

  gst_base_src_set_format (bsrc, priv->format);
  gst_segment_init (&priv->last_segment, priv->format);
  gst_segment_init (&priv->current_segment, priv->format);
  priv->pending_custom_segment = FALSE;

  return TRUE;
}

static gboolean
gst_app_src_stop (GstBaseSrc * bsrc)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
  GstAppSrcPrivate *priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  GST_DEBUG_OBJECT (appsrc, "stopping");
  priv->is_eos = FALSE;
  priv->flushing = TRUE;
  priv->started = FALSE;
  gst_app_src_flush_queued (appsrc, TRUE);
  g_cond_broadcast (&priv->cond);
  g_mutex_unlock (&priv->mutex);

  return TRUE;
}

static gboolean
gst_app_src_is_seekable (GstBaseSrc * src)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
  GstAppSrcPrivate *priv = appsrc->priv;
  gboolean res = FALSE;

  switch (priv->stream_type) {
    case GST_APP_STREAM_TYPE_STREAM:
      break;
    case GST_APP_STREAM_TYPE_SEEKABLE:
    case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
      res = TRUE;
      break;
  }
  return res;
}

static gboolean
gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (src);

  *size = gst_app_src_get_size (appsrc);

  return TRUE;
}

static gboolean
gst_app_src_query (GstBaseSrc * src, GstQuery * query)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
  GstAppSrcPrivate *priv = appsrc->priv;
  gboolean res;

  switch (GST_QUERY_TYPE (query)) {
    case GST_QUERY_LATENCY:
    {
      GstClockTime min, max;
      gboolean live;

      /* Query the parent class for the defaults */
      res = gst_base_src_query_latency (src, &live, &min, &max);

      /* overwrite with our values when we need to */
      g_mutex_lock (&priv->mutex);
      if (priv->min_latency != -1) {
        min = priv->min_latency;
        max = priv->max_latency;
      }
      g_mutex_unlock (&priv->mutex);

      gst_query_set_latency (query, live, min, max);
      break;
    }
    case GST_QUERY_SCHEDULING:
    {
      gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
      gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);

      switch (priv->stream_type) {
        case GST_APP_STREAM_TYPE_STREAM:
        case GST_APP_STREAM_TYPE_SEEKABLE:
          break;
        case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
          gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
          break;
      }
      res = TRUE;
      break;
    }
    case GST_QUERY_DURATION:
    {
      GstFormat format;
      gst_query_parse_duration (query, &format, NULL);
      if (format == GST_FORMAT_BYTES) {
        gst_query_set_duration (query, format, priv->size);
        res = TRUE;
      } else if (format == GST_FORMAT_TIME) {
        if (priv->duration != GST_CLOCK_TIME_NONE) {
          gst_query_set_duration (query, format, priv->duration);
          res = TRUE;
        } else {
          res = FALSE;
        }
      } else {
        res = FALSE;
      }
      break;
    }
    default:
      res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
      break;
  }

  return res;
}

/* will be called in push mode */
static gboolean
gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
  GstAppSrcPrivate *priv = appsrc->priv;
  gint64 desired_position;
  gboolean res = FALSE;
  gboolean emit;
  Callbacks *callbacks = NULL;

  desired_position = segment->position;

  /* no need to try to seek in streaming mode */
  if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
    return TRUE;

  GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
      desired_position, gst_format_get_name (segment->format));

  g_mutex_lock (&priv->mutex);
  emit = priv->emit_signals;
  if (priv->callbacks)
    callbacks = callbacks_ref (priv->callbacks);
  g_mutex_unlock (&priv->mutex);

  if (callbacks && callbacks->callbacks.seek_data) {
    res =
        callbacks->callbacks.seek_data (appsrc, desired_position,
        callbacks->user_data);
  } else if (emit) {
    g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
        desired_position, &res);
  }

  g_clear_pointer (&callbacks, callbacks_unref);

  if (res) {
    GST_DEBUG_OBJECT (appsrc, "flushing queue");
    g_mutex_lock (&priv->mutex);
    gst_app_src_flush_queued (appsrc, TRUE);
    gst_segment_copy_into (segment, &priv->last_segment);
    gst_segment_copy_into (segment, &priv->current_segment);
    priv->pending_custom_segment = FALSE;
    g_mutex_unlock (&priv->mutex);
    priv->is_eos = FALSE;
  } else {
    GST_WARNING_OBJECT (appsrc, "seek failed");
  }

  return res;
}

/* must be called with the appsrc mutex */
static gboolean
gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
{
  gboolean res = FALSE;
  gboolean emit;
  GstAppSrcPrivate *priv = appsrc->priv;
  Callbacks *callbacks = NULL;

  emit = priv->emit_signals;
  if (priv->callbacks)
    callbacks = callbacks_ref (priv->callbacks);
  g_mutex_unlock (&priv->mutex);

  GST_DEBUG_OBJECT (appsrc,
      "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
      priv->offset, offset);

  if (callbacks && callbacks->callbacks.seek_data)
    res = callbacks->callbacks.seek_data (appsrc, offset, callbacks->user_data);
  else if (emit)
    g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
        offset, &res);

  g_clear_pointer (&callbacks, callbacks_unref);

  g_mutex_lock (&priv->mutex);

  return res;
}

/* must be called with the appsrc mutex. After this call things can be
 * flushing */
static void
gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
{
  gboolean emit;
  GstAppSrcPrivate *priv = appsrc->priv;
  Callbacks *callbacks = NULL;

  emit = priv->emit_signals;
  if (priv->callbacks)
    callbacks = callbacks_ref (priv->callbacks);
  g_mutex_unlock (&priv->mutex);

  /* we have no data, we need some. We fire the signal with the size hint. */
  if (callbacks && callbacks->callbacks.need_data)
    callbacks->callbacks.need_data (appsrc, size, callbacks->user_data);
  else if (emit)
    g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
        NULL);

  g_clear_pointer (&callbacks, callbacks_unref);

  g_mutex_lock (&priv->mutex);
  /* we can be flushing now because we released the lock */
}

/* must be called with the appsrc mutex */
static gboolean
gst_app_src_do_negotiate (GstBaseSrc * basesrc)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
  GstAppSrcPrivate *priv = appsrc->priv;
  gboolean result;
  GstCaps *caps;

  GST_OBJECT_LOCK (basesrc);
  caps = priv->current_caps ? gst_caps_ref (priv->current_caps) : NULL;
  GST_OBJECT_UNLOCK (basesrc);

  /* Avoid deadlock by unlocking mutex
   * otherwise we get deadlock between this and stream lock */
  g_mutex_unlock (&priv->mutex);
  if (caps) {
    result = gst_base_src_set_caps (basesrc, caps);
    gst_caps_unref (caps);
  } else {
    result = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc);
  }
  g_mutex_lock (&priv->mutex);

  return result;
}

static gboolean
gst_app_src_negotiate (GstBaseSrc * basesrc)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
  GstAppSrcPrivate *priv = appsrc->priv;
  gboolean result;

  g_mutex_lock (&priv->mutex);
  result = gst_app_src_do_negotiate (basesrc);
  g_mutex_unlock (&priv->mutex);
  return result;
}

/* Update the currently queued bytes/buffers/time information for the item
 * that was just removed from the queue.
 *
 * If update_offset is set, additionally the offset of the source will be
 * moved forward accordingly as if that many bytes were output.
 */
static void
gst_app_src_update_queued_pop (GstAppSrc * appsrc, GstMiniObject * item,
    gboolean update_offset)
{
  GstAppSrcPrivate *priv = appsrc->priv;
  guint buf_size = 0;
  guint n_buffers = 0;
  GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;

  if (GST_IS_BUFFER (item)) {
    GstBuffer *buf = GST_BUFFER_CAST (item);
    buf_size = gst_buffer_get_size (buf);
    n_buffers = 1;

    end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
    if (end_buffer_ts != GST_CLOCK_TIME_NONE
        && GST_BUFFER_DURATION_IS_VALID (buf))
      end_buffer_ts += GST_BUFFER_DURATION (buf);

    GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", buf, buf_size);
  } else if (GST_IS_BUFFER_LIST (item)) {
    GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
    guint i;

    n_buffers = gst_buffer_list_length (buffer_list);

    for (i = 0; i < n_buffers; i++) {
      GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
      GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);

      buf_size += gst_buffer_get_size (tmp);
      /* Update to the last buffer's timestamp that is known */
      if (ts != GST_CLOCK_TIME_NONE) {
        end_buffer_ts = ts;
        if (GST_BUFFER_DURATION_IS_VALID (tmp))
          end_buffer_ts += GST_BUFFER_DURATION (tmp);
      }
    }
  }

  priv->queued_bytes -= buf_size;
  priv->queued_buffers -= n_buffers;

  /* Update time level if working on a TIME segment */
  if ((priv->current_segment.format == GST_FORMAT_TIME
          || (priv->current_segment.format == GST_FORMAT_UNDEFINED
              && priv->last_segment.format == GST_FORMAT_TIME))
      && end_buffer_ts != GST_CLOCK_TIME_NONE) {
    const GstSegment *segment =
        priv->current_segment.format ==
        GST_FORMAT_TIME ? &priv->current_segment : &priv->last_segment;

    /* Clip to the current segment boundaries */
    if (segment->stop != -1 && end_buffer_ts > segment->stop)
      end_buffer_ts = segment->stop;
    else if (segment->start > end_buffer_ts)
      end_buffer_ts = segment->start;

    priv->last_out_running_time =
        gst_segment_to_running_time (segment, GST_FORMAT_TIME, end_buffer_ts);

    GST_TRACE_OBJECT (appsrc,
        "Last in running time %" GST_TIME_FORMAT ", last out running time %"
        GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
        GST_TIME_ARGS (priv->last_out_running_time));

    /* If timestamps on both sides are known, calculate the current
     * fill level in time and consider the queue empty if the output
     * running time is lower than the input one (i.e. some kind of reset
     * has happened).
     */
    if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
        && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
      if (priv->last_out_running_time > priv->last_in_running_time) {
        priv->queued_time = 0;
      } else {
        priv->queued_time =
            priv->last_in_running_time - priv->last_out_running_time;
      }
    }
  }

  GST_DEBUG_OBJECT (appsrc,
      "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
      " buffers, %" GST_TIME_FORMAT, priv->queued_bytes,
      priv->queued_buffers, GST_TIME_ARGS (priv->queued_time));

  /* only update the offset when in random_access mode and when requested by
   * the caller, i.e. not when just dropping the item */
  if (update_offset && priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
    priv->offset += buf_size;
}

/* Update the currently queued bytes/buffers/time information for the item
 * that was just added to the queue.
 */
static void
gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item)
{
  GstAppSrcPrivate *priv = appsrc->priv;
  GstClockTime start_buffer_ts = GST_CLOCK_TIME_NONE;
  GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
  guint buf_size = 0;
  guint n_buffers = 0;

  if (GST_IS_BUFFER (item)) {
    GstBuffer *buf = GST_BUFFER_CAST (item);

    buf_size = gst_buffer_get_size (buf);
    n_buffers = 1;

    start_buffer_ts = end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
    if (end_buffer_ts != GST_CLOCK_TIME_NONE
        && GST_BUFFER_DURATION_IS_VALID (buf))
      end_buffer_ts += GST_BUFFER_DURATION (buf);
  } else if (GST_IS_BUFFER_LIST (item)) {
    GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
    guint i;

    n_buffers = gst_buffer_list_length (buffer_list);

    for (i = 0; i < n_buffers; i++) {
      GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
      GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);

      buf_size += gst_buffer_get_size (tmp);

      if (ts != GST_CLOCK_TIME_NONE) {
        if (start_buffer_ts == GST_CLOCK_TIME_NONE)
          start_buffer_ts = ts;
        end_buffer_ts = ts;
        if (GST_BUFFER_DURATION_IS_VALID (tmp))
          end_buffer_ts += GST_BUFFER_DURATION (tmp);
      }
    }
  }

  priv->queued_bytes += buf_size;
  priv->queued_buffers += n_buffers;

  /* Update time level if working on a TIME segment */
  if (priv->last_segment.format == GST_FORMAT_TIME
      && end_buffer_ts != GST_CLOCK_TIME_NONE) {
    /* Clip to the last segment boundaries */
    if (priv->last_segment.stop != -1
        && end_buffer_ts > priv->last_segment.stop)
      end_buffer_ts = priv->last_segment.stop;
    else if (priv->last_segment.start > end_buffer_ts)
      end_buffer_ts = priv->last_segment.start;

    priv->last_in_running_time =
        gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
        end_buffer_ts);

    /* If this is the only buffer then we can directly update the queued time
     * here. This is especially useful if this was the first buffer because
     * otherwise we would have to wait until it is actually unqueued to know
     * the queued duration */
    if (priv->queued_buffers == 1) {
      if (priv->last_segment.stop != -1
          && start_buffer_ts > priv->last_segment.stop)
        start_buffer_ts = priv->last_segment.stop;
      else if (priv->last_segment.start > start_buffer_ts)
        start_buffer_ts = priv->last_segment.start;

      priv->last_out_running_time =
          gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
          start_buffer_ts);
    }

    GST_TRACE_OBJECT (appsrc,
        "Last in running time %" GST_TIME_FORMAT ", last out running time %"
        GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
        GST_TIME_ARGS (priv->last_out_running_time));

    if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
        && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
      if (priv->last_out_running_time > priv->last_in_running_time) {
        priv->queued_time = 0;
      } else {
        priv->queued_time =
            priv->last_in_running_time - priv->last_out_running_time;
      }
    }
  }

  GST_DEBUG_OBJECT (appsrc,
      "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
      " buffers, %" GST_TIME_FORMAT, priv->queued_bytes, priv->queued_buffers,
      GST_TIME_ARGS (priv->queued_time));
}

static GstFlowReturn
gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
    GstBuffer ** buf)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
  GstAppSrcPrivate *priv = appsrc->priv;
  GstFlowReturn ret;

  GST_OBJECT_LOCK (appsrc);
  if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
          bsrc->segment.format == GST_FORMAT_BYTES)) {
    GST_DEBUG_OBJECT (appsrc,
        "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
        bsrc->segment.duration, priv->size);
    bsrc->segment.duration = priv->size;
    GST_OBJECT_UNLOCK (appsrc);

    gst_element_post_message (GST_ELEMENT (appsrc),
        gst_message_new_duration_changed (GST_OBJECT (appsrc)));
  } else if (G_UNLIKELY (priv->duration != bsrc->segment.duration &&
          bsrc->segment.format == GST_FORMAT_TIME)) {
    GST_DEBUG_OBJECT (appsrc,
        "Duration changed from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
        GST_TIME_ARGS (bsrc->segment.duration), GST_TIME_ARGS (priv->duration));
    bsrc->segment.duration = priv->duration;
    GST_OBJECT_UNLOCK (appsrc);

    gst_element_post_message (GST_ELEMENT (appsrc),
        gst_message_new_duration_changed (GST_OBJECT (appsrc)));
  } else {
    GST_OBJECT_UNLOCK (appsrc);
  }

  g_mutex_lock (&priv->mutex);
  /* check flushing first */
  if (G_UNLIKELY (priv->flushing))
    goto flushing;

  if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
    /* if we are dealing with a random-access stream, issue a seek if the offset
     * changed. */
    if (G_UNLIKELY (priv->offset != offset)) {
      gboolean res;

      /* do the seek */
      res = gst_app_src_emit_seek (appsrc, offset);

      if (G_UNLIKELY (!res))
        /* failing to seek is fatal */
        goto seek_error;

      priv->offset = offset;
      priv->is_eos = FALSE;
    }
  }

  while (TRUE) {
    /* Our lock may have been release to push events or caps, check out
     * state in case we are now flushing. */
    if (G_UNLIKELY (priv->flushing))
      goto flushing;

    /* return data as long as we have some */
    if (!gst_queue_array_is_empty (priv->queue)) {
      GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);

      if (GST_IS_CAPS (obj)) {
        GstCaps *next_caps = GST_CAPS (obj);
        gboolean caps_changed = TRUE;

        if (next_caps && priv->current_caps)
          caps_changed = !gst_caps_is_equal (next_caps, priv->current_caps);
        else
          caps_changed = (next_caps != priv->current_caps);

        gst_caps_replace (&priv->current_caps, next_caps);

        if (next_caps) {
          gst_caps_unref (next_caps);
        }

        if (caps_changed)
          gst_app_src_do_negotiate (bsrc);

        /* Continue checks caps and queue */
        continue;
      }

      if (GST_IS_BUFFER (obj)) {
        GstBuffer *buffer = GST_BUFFER (obj);

        /* Mark the buffer as DISCONT if we previously dropped a buffer
         * instead of outputting it */
        if (priv->need_discont_downstream) {
          buffer = gst_buffer_make_writable (buffer);
          GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
          priv->need_discont_downstream = FALSE;
        }

        *buf = buffer;
      } else if (GST_IS_BUFFER_LIST (obj)) {
        GstBufferList *buffer_list;

        buffer_list = GST_BUFFER_LIST (obj);

        /* Mark the first buffer of the buffer list as DISCONT if we
         * previously dropped a buffer instead of outputting it */
        if (priv->need_discont_downstream) {
          GstBuffer *buffer;

          buffer_list = gst_buffer_list_make_writable (buffer_list);
          buffer = gst_buffer_list_get_writable (buffer_list, 0);
          GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
          priv->need_discont_downstream = FALSE;
        }

        gst_base_src_submit_buffer_list (bsrc, buffer_list);
        *buf = NULL;
      } else if (GST_IS_EVENT (obj)) {
        GstEvent *event = GST_EVENT (obj);

        GST_DEBUG_OBJECT (appsrc, "pop event %" GST_PTR_FORMAT, event);

        if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
          const GstSegment *segment = NULL;

          gst_event_parse_segment (event, &segment);
          g_assert (segment != NULL);

          if (!gst_segment_is_equal (&priv->current_segment, segment)) {
            GST_DEBUG_OBJECT (appsrc,
                "Update new segment %" GST_PTR_FORMAT, event);
            if (!gst_base_src_new_segment (bsrc, segment)) {
              GST_ERROR_OBJECT (appsrc,
                  "Couldn't set new segment %" GST_PTR_FORMAT, event);
              gst_event_unref (event);
              goto invalid_segment;
            }
            gst_segment_copy_into (segment, &priv->current_segment);
          }

          gst_event_unref (event);
        } else {
          GstEvent *seg_event;
          GstSegment last_segment = priv->last_segment;

          /* event is serialized with the buffers flow */

          /* We are about to push an event, release out lock */
          g_mutex_unlock (&priv->mutex);

          seg_event =
              gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc),
              GST_EVENT_SEGMENT, 0);
          if (!seg_event) {
            seg_event = gst_event_new_segment (&last_segment);

            GST_DEBUG_OBJECT (appsrc,
                "received serialized event before first buffer, push default segment %"
                GST_PTR_FORMAT, seg_event);

            gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), seg_event);
          } else {
            gst_event_unref (seg_event);
          }

          gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event);

          g_mutex_lock (&priv->mutex);
        }
        continue;
      } else {
        g_assert_not_reached ();
      }

      gst_app_src_update_queued_pop (appsrc, obj, TRUE);

      /* signal that we removed an item */
      if ((priv->wait_status & APP_WAITING))
        g_cond_broadcast (&priv->cond);

      /* see if we go lower than the min-percent */
      if (priv->min_percent) {
        if ((priv->max_bytes
                && priv->queued_bytes * 100 / priv->max_bytes <=
                priv->min_percent) || (priv->max_buffers
                && priv->queued_buffers * 100 / priv->max_buffers <=
                priv->min_percent) || (priv->max_time
                && priv->queued_time * 100 / priv->max_time <=
                priv->min_percent)) {
          /* ignore flushing state, we got a buffer and we will return it now.
           * Errors will be handled in the next round */
          gst_app_src_emit_need_data (appsrc, size);
        }
      }
      ret = GST_FLOW_OK;
      break;
    } else {
      gst_app_src_emit_need_data (appsrc, size);

      /* we can be flushing now because we released the lock above */
      if (G_UNLIKELY (priv->flushing))
        goto flushing;

      /* if we have a buffer now, continue the loop and try to return it. In
       * random-access mode (where a buffer is normally pushed in the above
       * signal) we can still be empty because the pushed buffer got flushed or
       * when the application pushes the requested buffer later, we support both
       * possibilities. */
      if (!gst_queue_array_is_empty (priv->queue))
        continue;

      /* no buffer yet, maybe we are EOS, if not, block for more data. */
    }

    /* check EOS */
    if (G_UNLIKELY (priv->is_eos))
      goto eos;

    /* nothing to return, wait a while for new data or flushing. */
    priv->wait_status |= STREAM_WAITING;
    g_cond_wait (&priv->cond, &priv->mutex);
    priv->wait_status &= ~STREAM_WAITING;
  }
  g_mutex_unlock (&priv->mutex);
  return ret;

  /* ERRORS */
flushing:
  {
    GST_DEBUG_OBJECT (appsrc, "we are flushing");
    g_mutex_unlock (&priv->mutex);
    return GST_FLOW_FLUSHING;
  }
eos:
  {
    GST_DEBUG_OBJECT (appsrc, "we are EOS");
    g_mutex_unlock (&priv->mutex);
    return GST_FLOW_EOS;
  }
seek_error:
  {
    g_mutex_unlock (&priv->mutex);
    GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
        GST_ERROR_SYSTEM);
    return GST_FLOW_ERROR;
  }

invalid_segment:
  {
    g_mutex_unlock (&priv->mutex);
    GST_ELEMENT_ERROR (appsrc, LIBRARY, SETTINGS,
        (NULL), ("Failed to configure the provided input segment."));
    return GST_FLOW_ERROR;
  }
}

/* external API */

/**
 * gst_app_src_set_caps:
 * @appsrc: a #GstAppSrc
 * @caps: (nullable): caps to set
 *
 * Set the capabilities on the appsrc element.  This function takes
 * a copy of the caps structure. After calling this method, the source will
 * only produce caps that match @caps. @caps must be fixed and the caps on the
 * buffers must match the caps or left NULL.
 */
void
gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
{
  GstAppSrcPrivate *priv;
  gboolean caps_changed;

  g_return_if_fail (GST_IS_APP_SRC (appsrc));

  priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);

  GST_OBJECT_LOCK (appsrc);
  if (caps && priv->last_caps)
    caps_changed = !gst_caps_is_equal (caps, priv->last_caps);
  else
    caps_changed = (caps != priv->last_caps);

  if (caps_changed) {
    GstCaps *new_caps;
    gpointer t;

    new_caps = caps ? gst_caps_copy (caps) : NULL;
    GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);

    while ((t = gst_queue_array_peek_tail (priv->queue)) && GST_IS_CAPS (t)) {
      gst_caps_unref (gst_queue_array_pop_tail (priv->queue));
    }
    gst_queue_array_push_tail (priv->queue, new_caps);
    gst_caps_replace (&priv->last_caps, new_caps);

    if ((priv->wait_status & STREAM_WAITING))
      g_cond_broadcast (&priv->cond);
  }

  GST_OBJECT_UNLOCK (appsrc);

  g_mutex_unlock (&priv->mutex);
}

/**
 * gst_app_src_get_caps:
 * @appsrc: a #GstAppSrc
 *
 * Get the configured caps on @appsrc.
 *
 * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
 */
GstCaps *
gst_app_src_get_caps (GstAppSrc * appsrc)
{

  GstCaps *caps;

  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);

  GST_OBJECT_LOCK (appsrc);
  if ((caps = appsrc->priv->last_caps))
    gst_caps_ref (caps);
  GST_OBJECT_UNLOCK (appsrc);

  return caps;

}

/**
 * gst_app_src_set_size:
 * @appsrc: a #GstAppSrc
 * @size: the size to set
 *
 * Set the size of the stream in bytes. A value of -1 means that the size is
 * not known.
 */
void
gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
{
  GstAppSrcPrivate *priv;

  g_return_if_fail (GST_IS_APP_SRC (appsrc));

  priv = appsrc->priv;

  GST_OBJECT_LOCK (appsrc);
  GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
  priv->size = size;
  GST_OBJECT_UNLOCK (appsrc);
}

/**
 * gst_app_src_get_size:
 * @appsrc: a #GstAppSrc
 *
 * Get the size of the stream in bytes. A value of -1 means that the size is
 * not known.
 *
 * Returns: the size of the stream previously set with gst_app_src_set_size();
 */
gint64
gst_app_src_get_size (GstAppSrc * appsrc)
{
  gint64 size;
  GstAppSrcPrivate *priv;

  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);

  priv = appsrc->priv;

  GST_OBJECT_LOCK (appsrc);
  size = priv->size;
  GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
  GST_OBJECT_UNLOCK (appsrc);

  return size;
}

/**
 * gst_app_src_set_duration:
 * @appsrc: a #GstAppSrc
 * @duration: the duration to set
 *
 * Set the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
 * not known.
 *
 * Since: 1.10
 */
void
gst_app_src_set_duration (GstAppSrc * appsrc, GstClockTime duration)
{
  GstAppSrcPrivate *priv;

  g_return_if_fail (GST_IS_APP_SRC (appsrc));

  priv = appsrc->priv;

  GST_OBJECT_LOCK (appsrc);
  GST_DEBUG_OBJECT (appsrc, "setting duration of %" GST_TIME_FORMAT,
      GST_TIME_ARGS (duration));
  priv->duration = duration;
  GST_OBJECT_UNLOCK (appsrc);
}

/**
 * gst_app_src_get_duration:
 * @appsrc: a #GstAppSrc
 *
 * Get the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
 * not known.
 *
 * Returns: the duration of the stream previously set with gst_app_src_set_duration();
 *
 * Since: 1.10
 */
GstClockTime
gst_app_src_get_duration (GstAppSrc * appsrc)
{
  GstClockTime duration;
  GstAppSrcPrivate *priv;

  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);

  priv = appsrc->priv;

  GST_OBJECT_LOCK (appsrc);
  duration = priv->duration;
  GST_DEBUG_OBJECT (appsrc, "getting duration of %" GST_TIME_FORMAT,
      GST_TIME_ARGS (duration));
  GST_OBJECT_UNLOCK (appsrc);

  return duration;
}

/**
 * gst_app_src_set_stream_type:
 * @appsrc: a #GstAppSrc
 * @type: the new state
 *
 * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
 * be connected to.
 *
 * A stream_type stream
 */
void
gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
{
  GstAppSrcPrivate *priv;

  g_return_if_fail (GST_IS_APP_SRC (appsrc));

  priv = appsrc->priv;

  GST_OBJECT_LOCK (appsrc);
  GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
  priv->stream_type = type;
  GST_OBJECT_UNLOCK (appsrc);
}

/**
 * gst_app_src_get_stream_type:
 * @appsrc: a #GstAppSrc
 *
 * Get the stream type. Control the stream type of @appsrc
 * with gst_app_src_set_stream_type().
 *
 * Returns: the stream type.
 */
GstAppStreamType
gst_app_src_get_stream_type (GstAppSrc * appsrc)
{
  gboolean stream_type;
  GstAppSrcPrivate *priv;

  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);

  priv = appsrc->priv;

  GST_OBJECT_LOCK (appsrc);
  stream_type = priv->stream_type;
  GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
  GST_OBJECT_UNLOCK (appsrc);

  return stream_type;
}

/**
 * gst_app_src_set_max_bytes:
 * @appsrc: a #GstAppSrc
 * @max: the maximum number of bytes to queue
 *
 * Set the maximum amount of bytes that can be queued in @appsrc.
 * After the maximum amount of bytes are queued, @appsrc will emit the
 * "enough-data" signal.
 */
void
gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
{
  GstAppSrcPrivate *priv;

  g_return_if_fail (GST_IS_APP_SRC (appsrc));

  priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  if (max != priv->max_bytes) {
    GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
    priv->max_bytes = max;
    /* signal the change */
    g_cond_broadcast (&priv->cond);
  }
  g_mutex_unlock (&priv->mutex);
}

/**
 * gst_app_src_get_max_bytes:
 * @appsrc: a #GstAppSrc
 *
 * Get the maximum amount of bytes that can be queued in @appsrc.
 *
 * Returns: The maximum amount of bytes that can be queued.
 */
guint64
gst_app_src_get_max_bytes (GstAppSrc * appsrc)
{
  guint64 result;
  GstAppSrcPrivate *priv;

  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);

  priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  result = priv->max_bytes;
  GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
  g_mutex_unlock (&priv->mutex);

  return result;
}

/**
 * gst_app_src_get_current_level_bytes:
 * @appsrc: a #GstAppSrc
 *
 * Get the number of currently queued bytes inside @appsrc.
 *
 * Returns: The number of currently queued bytes.
 *
 * Since: 1.2
 */
guint64
gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
{
  guint64 queued;
  GstAppSrcPrivate *priv;

  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);

  priv = appsrc->priv;

  GST_OBJECT_LOCK (appsrc);
  queued = priv->queued_bytes;
  GST_DEBUG_OBJECT (appsrc, "current level bytes is %" G_GUINT64_FORMAT,
      queued);
  GST_OBJECT_UNLOCK (appsrc);

  return queued;
}

/**
 * gst_app_src_set_max_buffers:
 * @appsrc: a #GstAppSrc
 * @max: the maximum number of buffers to queue
 *
 * Set the maximum amount of buffers that can be queued in @appsrc.
 * After the maximum amount of buffers are queued, @appsrc will emit the
 * "enough-data" signal.
 *
 * Since: 1.20
 */
void
gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint64 max)
{
  GstAppSrcPrivate *priv;

  g_return_if_fail (GST_IS_APP_SRC (appsrc));

  priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  if (max != priv->max_buffers) {
    GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %" G_GUINT64_FORMAT, max);
    priv->max_buffers = max;
    /* signal the change */
    g_cond_broadcast (&priv->cond);
  }
  g_mutex_unlock (&priv->mutex);
}

/**
 * gst_app_src_get_max_buffers:
 * @appsrc: a #GstAppSrc
 *
 * Get the maximum amount of buffers that can be queued in @appsrc.
 *
 * Returns: The maximum amount of buffers that can be queued.
 *
 * Since: 1.20
 */
guint64
gst_app_src_get_max_buffers (GstAppSrc * appsrc)
{
  guint64 result;
  GstAppSrcPrivate *priv;

  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);

  priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  result = priv->max_buffers;
  GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %" G_GUINT64_FORMAT,
      result);
  g_mutex_unlock (&priv->mutex);

  return result;
}

/**
 * gst_app_src_get_current_level_buffers:
 * @appsrc: a #GstAppSrc
 *
 * Get the number of currently queued buffers inside @appsrc.
 *
 * Returns: The number of currently queued buffers.
 *
 * Since: 1.20
 */
guint64
gst_app_src_get_current_level_buffers (GstAppSrc * appsrc)
{
  guint64 queued;
  GstAppSrcPrivate *priv;

  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);

  priv = appsrc->priv;

  GST_OBJECT_LOCK (appsrc);
  queued = priv->queued_buffers;
  GST_DEBUG_OBJECT (appsrc, "current level buffers is %" G_GUINT64_FORMAT,
      queued);
  GST_OBJECT_UNLOCK (appsrc);

  return queued;
}

/**
 * gst_app_src_set_max_time:
 * @appsrc: a #GstAppSrc
 * @max: the maximum amonut of time to queue
 *
 * Set the maximum amount of time that can be queued in @appsrc.
 * After the maximum amount of time are queued, @appsrc will emit the
 * "enough-data" signal.
 *
 * Since: 1.20
 */
void
gst_app_src_set_max_time (GstAppSrc * appsrc, GstClockTime max)
{
  GstAppSrcPrivate *priv;

  g_return_if_fail (GST_IS_APP_SRC (appsrc));

  priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  if (max != priv->max_time) {
    GST_DEBUG_OBJECT (appsrc, "setting max-time to %" GST_TIME_FORMAT,
        GST_TIME_ARGS (max));
    priv->max_time = max;
    /* signal the change */
    g_cond_broadcast (&priv->cond);
  }
  g_mutex_unlock (&priv->mutex);
}

/**
 * gst_app_src_get_max_time:
 * @appsrc: a #GstAppSrc
 *
 * Get the maximum amount of time that can be queued in @appsrc.
 *
 * Returns: The maximum amount of time that can be queued.
 *
 * Since: 1.20
 */
GstClockTime
gst_app_src_get_max_time (GstAppSrc * appsrc)
{
  GstClockTime result;
  GstAppSrcPrivate *priv;

  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);

  priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  result = priv->max_time;
  GST_DEBUG_OBJECT (appsrc, "getting max-time of %" GST_TIME_FORMAT,
      GST_TIME_ARGS (result));
  g_mutex_unlock (&priv->mutex);

  return result;
}

/**
 * gst_app_src_get_current_level_time:
 * @appsrc: a #GstAppSrc
 *
 * Get the amount of currently queued time inside @appsrc.
 *
 * Returns: The amount of currently queued time.
 *
 * Since: 1.20
 */
GstClockTime
gst_app_src_get_current_level_time (GstAppSrc * appsrc)
{
  gint64 queued;
  GstAppSrcPrivate *priv;

  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);

  priv = appsrc->priv;

  GST_OBJECT_LOCK (appsrc);
  queued = priv->queued_time;
  GST_DEBUG_OBJECT (appsrc, "current level time is %" GST_TIME_FORMAT,
      GST_TIME_ARGS (queued));
  GST_OBJECT_UNLOCK (appsrc);

  return queued;
}

static void
gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
    gboolean do_max, guint64 max)
{
  GstAppSrcPrivate *priv = appsrc->priv;
  gboolean changed = FALSE;

  g_mutex_lock (&priv->mutex);
  if (do_min && priv->min_latency != min) {
    priv->min_latency = min;
    changed = TRUE;
  }
  if (do_max && priv->max_latency != max) {
    priv->max_latency = max;
    changed = TRUE;
  }
  g_mutex_unlock (&priv->mutex);

  if (changed) {
    GST_DEBUG_OBJECT (appsrc, "posting latency changed");
    gst_element_post_message (GST_ELEMENT_CAST (appsrc),
        gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
  }
}

/**
 * gst_app_src_set_leaky_type:
 * @appsrc: a #GstAppSrc
 * @leaky: the #GstAppLeakyType
 *
 * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
 * will drop any buffers that are pushed into it once its internal queue is
 * full. The selected type defines whether to drop the oldest or new
 * buffers.
 *
 * Since: 1.20
 */
void
gst_app_src_set_leaky_type (GstAppSrc * appsrc, GstAppLeakyType leaky)
{
  g_return_if_fail (GST_IS_APP_SRC (appsrc));

  appsrc->priv->leaky_type = leaky;
}

/**
 * gst_app_src_get_leaky_type:
 * @appsrc: a #GstAppSrc
 *
 * Returns the currently set #GstAppLeakyType. See gst_app_src_set_leaky_type()
 * for more details.
 *
 * Returns: The currently set #GstAppLeakyType.
 *
 * Since: 1.20
 */
GstAppLeakyType
gst_app_src_get_leaky_type (GstAppSrc * appsrc)
{
  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_APP_LEAKY_TYPE_NONE);

  return appsrc->priv->leaky_type;
}

/**
 * gst_app_src_set_latency:
 * @appsrc: a #GstAppSrc
 * @min: the min latency
 * @max: the max latency
 *
 * Configure the @min and @max latency in @src. If @min is set to -1, the
 * default latency calculations for pseudo-live sources will be used.
 */
void
gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
{
  gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
}

/**
 * gst_app_src_get_latency:
 * @appsrc: a #GstAppSrc
 * @min: (out): the min latency
 * @max: (out): the max latency
 *
 * Retrieve the min and max latencies in @min and @max respectively.
 */
void
gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
{
  GstAppSrcPrivate *priv;

  g_return_if_fail (GST_IS_APP_SRC (appsrc));

  priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  if (min)
    *min = priv->min_latency;
  if (max)
    *max = priv->max_latency;
  g_mutex_unlock (&priv->mutex);
}

/**
 * gst_app_src_set_emit_signals:
 * @appsrc: a #GstAppSrc
 * @emit: the new state
 *
 * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
 * by default disabled because signal emission is expensive and unneeded when
 * the application prefers to operate in pull mode.
 */
void
gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
{
  GstAppSrcPrivate *priv;

  g_return_if_fail (GST_IS_APP_SRC (appsrc));

  priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  priv->emit_signals = emit;
  g_mutex_unlock (&priv->mutex);
}

/**
 * gst_app_src_get_emit_signals:
 * @appsrc: a #GstAppSrc
 *
 * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
 *
 * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
 * signals.
 */
gboolean
gst_app_src_get_emit_signals (GstAppSrc * appsrc)
{
  gboolean result;
  GstAppSrcPrivate *priv;

  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);

  priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  result = priv->emit_signals;
  g_mutex_unlock (&priv->mutex);

  return result;
}

static GstFlowReturn
gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
    GstBufferList * buflist, gboolean steal_ref)
{
  gboolean first = TRUE;
  GstAppSrcPrivate *priv;

  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);

  priv = appsrc->priv;

  if (buffer != NULL)
    g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
  else
    g_return_val_if_fail (GST_IS_BUFFER_LIST (buflist), GST_FLOW_ERROR);

  if (buflist != NULL) {
    if (gst_buffer_list_length (buflist) == 0)
      return GST_FLOW_OK;

    buffer = gst_buffer_list_get (buflist, 0);
  }

  if (GST_BUFFER_DTS (buffer) == GST_CLOCK_TIME_NONE &&
      GST_BUFFER_PTS (buffer) == GST_CLOCK_TIME_NONE &&
      gst_base_src_get_do_timestamp (GST_BASE_SRC_CAST (appsrc))) {
    GstClock *clock;

    clock = gst_element_get_clock (GST_ELEMENT_CAST (appsrc));
    if (clock) {
      GstClockTime now;
      GstClockTime base_time =
          gst_element_get_base_time (GST_ELEMENT_CAST (appsrc));

      now = gst_clock_get_time (clock);
      if (now > base_time)
        now -= base_time;
      else
        now = 0;
      gst_object_unref (clock);

      if (buflist == NULL) {
        if (!steal_ref) {
          buffer = gst_buffer_copy (buffer);
          steal_ref = TRUE;
        } else {
          buffer = gst_buffer_make_writable (buffer);
        }
      } else {
        if (!steal_ref) {
          buflist = gst_buffer_list_copy (buflist);
          steal_ref = TRUE;
        } else {
          buflist = gst_buffer_list_make_writable (buflist);
        }
        buffer = gst_buffer_list_get_writable (buflist, 0);
      }

      GST_BUFFER_PTS (buffer) = now;
      GST_BUFFER_DTS (buffer) = now;
    } else {
      GST_WARNING_OBJECT (appsrc,
          "do-timestamp=TRUE but buffers are provided before "
          "reaching the PLAYING state and having a clock. Timestamps will "
          "not be accurate!");
    }
  }

  g_mutex_lock (&priv->mutex);

  while (TRUE) {
    /* can't accept buffers when we are flushing or EOS */
    if (priv->flushing)
      goto flushing;

    if (priv->is_eos)
      goto eos;

    if ((priv->max_bytes && priv->queued_bytes >= priv->max_bytes) ||
        (priv->max_buffers && priv->queued_buffers >= priv->max_buffers) ||
        (priv->max_time && priv->queued_time >= priv->max_time)) {
      GST_DEBUG_OBJECT (appsrc,
          "queue filled (queued %" G_GUINT64_FORMAT " bytes, max %"
          G_GUINT64_FORMAT " bytes, " "queued %" G_GUINT64_FORMAT
          " buffers, max %" G_GUINT64_FORMAT " buffers, " "queued %"
          GST_TIME_FORMAT " time, max %" GST_TIME_FORMAT " time)",
          priv->queued_bytes, priv->max_bytes, priv->queued_buffers,
          priv->max_buffers, GST_TIME_ARGS (priv->queued_time),
          GST_TIME_ARGS (priv->max_time));

      if (first) {
        Callbacks *callbacks = NULL;
        gboolean emit;

        emit = priv->emit_signals;
        if (priv->callbacks)
          callbacks = callbacks_ref (priv->callbacks);
        /* only signal on the first push */
        g_mutex_unlock (&priv->mutex);

        if (callbacks && callbacks->callbacks.enough_data)
          callbacks->callbacks.enough_data (appsrc, callbacks->user_data);
        else if (emit)
          g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
              NULL);

        g_clear_pointer (&callbacks, callbacks_unref);

        g_mutex_lock (&priv->mutex);
      }

      if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) {
        priv->need_discont_upstream = TRUE;
        goto dropped;
      } else if (priv->leaky_type == GST_APP_LEAKY_TYPE_DOWNSTREAM) {
        guint i, length = gst_queue_array_get_length (priv->queue);
        GstMiniObject *item = NULL;

        /* Find the oldest buffer or buffer list and drop it, then update the
         * limits. Dropping one is sufficient to go below the limits again.
         */
        for (i = 0; i < length; i++) {
          item = gst_queue_array_peek_nth (priv->queue, i);
          if (GST_IS_BUFFER (item) || GST_IS_BUFFER_LIST (item)) {
            gst_queue_array_drop_element (priv->queue, i);
            break;
          }
          /* To not accidentally have an event after the loop */
          item = NULL;
        }

        if (!item) {
          GST_FIXME_OBJECT (appsrc,
              "No buffer or buffer list queued but queue is full");
          /* This shouldn't really happen but in this case we can't really do
           * anything apart from accepting the buffer / bufferlist */
          break;
        }

        GST_WARNING_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item);

        gst_app_src_update_queued_pop (appsrc, item, FALSE);
        gst_mini_object_unref (item);

        priv->need_discont_downstream = TRUE;
        continue;
      }

      if (first) {
        /* continue to check for flushing/eos after releasing the lock */
        first = FALSE;
        continue;
      }
      if (priv->block) {
        GST_DEBUG_OBJECT (appsrc, "waiting for free space");
        /* we are filled, wait until a buffer gets popped or when we
         * flush. */
        priv->wait_status |= APP_WAITING;
        g_cond_wait (&priv->cond, &priv->mutex);
        priv->wait_status &= ~APP_WAITING;
      } else {
        /* no need to wait for free space, we just pump more data into the
         * queue hoping that the caller reacts to the enough-data signal and
         * stops pushing buffers. */
        break;
      }
    } else {
      break;
    }
  }

  if (priv->pending_custom_segment) {
    GstEvent *event = gst_event_new_segment (&priv->last_segment);

    GST_DEBUG_OBJECT (appsrc, "enqueue new segment %" GST_PTR_FORMAT, event);
    gst_queue_array_push_tail (priv->queue, event);
    priv->pending_custom_segment = FALSE;
  }

  if (buflist != NULL) {
    /* Mark the first buffer of the buffer list as DISCONT if we previously
     * dropped a buffer instead of queueing it */
    if (priv->need_discont_upstream) {
      if (!steal_ref) {
        buflist = gst_buffer_list_copy (buflist);
        steal_ref = TRUE;
      } else {
        buflist = gst_buffer_list_make_writable (buflist);
      }
      buffer = gst_buffer_list_get_writable (buflist, 0);
      GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
      priv->need_discont_upstream = FALSE;
    }

    GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);

    if (!steal_ref)
      gst_buffer_list_ref (buflist);
    gst_queue_array_push_tail (priv->queue, buflist);
  } else {
    /* Mark the buffer as DISCONT if we previously dropped a buffer instead of
     * queueing it */
    if (priv->need_discont_upstream) {
      if (!steal_ref) {
        buffer = gst_buffer_copy (buffer);
        steal_ref = TRUE;
      } else {
        buffer = gst_buffer_make_writable (buffer);
      }
      GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
      priv->need_discont_upstream = FALSE;
    }

    GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
    if (!steal_ref)
      gst_buffer_ref (buffer);
    gst_queue_array_push_tail (priv->queue, buffer);
  }

  gst_app_src_update_queued_push (appsrc,
      buflist ? GST_MINI_OBJECT_CAST (buflist) : GST_MINI_OBJECT_CAST (buffer));

  if ((priv->wait_status & STREAM_WAITING))
    g_cond_broadcast (&priv->cond);

  g_mutex_unlock (&priv->mutex);

  return GST_FLOW_OK;

  /* ERRORS */
flushing:
  {
    GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
    if (steal_ref) {
      if (buflist)
        gst_buffer_list_unref (buflist);
      else
        gst_buffer_unref (buffer);
    }
    g_mutex_unlock (&priv->mutex);
    return GST_FLOW_FLUSHING;
  }
eos:
  {
    GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
    if (steal_ref) {
      if (buflist)
        gst_buffer_list_unref (buflist);
      else
        gst_buffer_unref (buffer);
    }
    g_mutex_unlock (&priv->mutex);
    return GST_FLOW_EOS;
  }
dropped:
  {
    GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer);
    if (steal_ref) {
      if (buflist)
        gst_buffer_list_unref (buflist);
      else
        gst_buffer_unref (buffer);
    }
    g_mutex_unlock (&priv->mutex);
    return GST_FLOW_EOS;
  }
}

static GstFlowReturn
gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
    gboolean steal_ref)
{
  return gst_app_src_push_internal (appsrc, buffer, NULL, steal_ref);
}

static GstFlowReturn
gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample)
{
  GstAppSrcPrivate *priv = appsrc->priv;
  GstBufferList *buffer_list;
  GstBuffer *buffer;
  GstCaps *caps;

  g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);

  caps = gst_sample_get_caps (sample);
  if (caps != NULL) {
    gst_app_src_set_caps (appsrc, caps);
  } else {
    GST_WARNING_OBJECT (appsrc, "received sample without caps");
  }

  if (priv->handle_segment_change && priv->format == GST_FORMAT_TIME) {
    GstSegment *segment = gst_sample_get_segment (sample);

    if (segment->format != GST_FORMAT_TIME) {
      GST_LOG_OBJECT (appsrc, "format %s is not supported",
          gst_format_get_name (segment->format));
      goto handle_buffer;
    }

    g_mutex_lock (&priv->mutex);
    if (gst_segment_is_equal (&priv->last_segment, segment)) {
      GST_LOG_OBJECT (appsrc, "segment wasn't changed");
      g_mutex_unlock (&priv->mutex);
      goto handle_buffer;
    } else {
      GST_LOG_OBJECT (appsrc,
          "segment changed %" GST_SEGMENT_FORMAT " -> %" GST_SEGMENT_FORMAT,
          &priv->last_segment, segment);
    }

    /* will be pushed to queue with next buffer/buffer-list */
    gst_segment_copy_into (segment, &priv->last_segment);
    priv->pending_custom_segment = TRUE;
    g_mutex_unlock (&priv->mutex);
  }

handle_buffer:

  buffer = gst_sample_get_buffer (sample);
  if (buffer != NULL)
    return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);

  buffer_list = gst_sample_get_buffer_list (sample);
  if (buffer_list != NULL)
    return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);

  GST_WARNING_OBJECT (appsrc, "received sample without buffer or buffer list");
  return GST_FLOW_OK;
}

/**
 * gst_app_src_push_buffer:
 * @appsrc: a #GstAppSrc
 * @buffer: (transfer full): a #GstBuffer to push
 *
 * Adds a buffer to the queue of buffers that the appsrc element will
 * push to its source pad.  This function takes ownership of the buffer.
 *
 * When the block property is TRUE, this function can block until free
 * space becomes available in the queue.
 *
 * Returns: #GST_FLOW_OK when the buffer was successfully queued.
 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
 * #GST_FLOW_EOS when EOS occurred.
 */
GstFlowReturn
gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
{
  return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
}

/**
 * gst_app_src_push_buffer_list:
 * @appsrc: a #GstAppSrc
 * @buffer_list: (transfer full): a #GstBufferList to push
 *
 * Adds a buffer list to the queue of buffers and buffer lists that the
 * appsrc element will push to its source pad.  This function takes ownership
 * of @buffer_list.
 *
 * When the block property is TRUE, this function can block until free
 * space becomes available in the queue.
 *
 * Returns: #GST_FLOW_OK when the buffer list was successfully queued.
 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
 * #GST_FLOW_EOS when EOS occurred.
 *
 * Since: 1.14
 */
GstFlowReturn
gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list)
{
  return gst_app_src_push_internal (appsrc, NULL, buffer_list, TRUE);
}

/**
 * gst_app_src_push_sample:
 * @appsrc: a #GstAppSrc
 * @sample: (transfer none): a #GstSample from which buffer and caps may be
 * extracted
 *
 * Extract a buffer from the provided sample and adds it to the queue of
 * buffers that the appsrc element will push to its source pad. Any
 * previous caps that were set on appsrc will be replaced by the caps
 * associated with the sample if not equal.
 *
 * This function does not take ownership of the
 * sample so the sample needs to be unreffed after calling this function.
 *
 * When the block property is TRUE, this function can block until free
 * space becomes available in the queue.
 *
 * Returns: #GST_FLOW_OK when the buffer was successfully queued.
 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
 * #GST_FLOW_EOS when EOS occurred.
 *
 * Since: 1.6
 *
 */
GstFlowReturn
gst_app_src_push_sample (GstAppSrc * appsrc, GstSample * sample)
{
  return gst_app_src_push_sample_internal (appsrc, sample);
}

/* push a buffer without stealing the ref of the buffer. This is used for the
 * action signal. */
static GstFlowReturn
gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
{
  return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
}

/* push a buffer list without stealing the ref of the buffer list. This is
 * used for the action signal. */
static GstFlowReturn
gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
    GstBufferList * buffer_list)
{
  return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
}

/* push a sample without stealing the ref. This is used for the
 * action signal. */
static GstFlowReturn
gst_app_src_push_sample_action (GstAppSrc * appsrc, GstSample * sample)
{
  return gst_app_src_push_sample_internal (appsrc, sample);
}

/**
 * gst_app_src_end_of_stream:
 * @appsrc: a #GstAppSrc
 *
 * Indicates to the appsrc element that the last buffer queued in the
 * element is the last buffer of the stream.
 *
 * Returns: #GST_FLOW_OK when the EOS was successfully queued.
 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
 */
GstFlowReturn
gst_app_src_end_of_stream (GstAppSrc * appsrc)
{
  GstAppSrcPrivate *priv;

  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);

  priv = appsrc->priv;

  g_mutex_lock (&priv->mutex);
  /* can't accept buffers when we are flushing. We can accept them when we are
   * EOS although it will not do anything. */
  if (priv->flushing)
    goto flushing;

  GST_DEBUG_OBJECT (appsrc, "sending EOS");
  priv->is_eos = TRUE;
  g_cond_broadcast (&priv->cond);
  g_mutex_unlock (&priv->mutex);

  return GST_FLOW_OK;

  /* ERRORS */
flushing:
  {
    g_mutex_unlock (&priv->mutex);
    GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
    return GST_FLOW_FLUSHING;
  }
}

/**
 * gst_app_src_set_callbacks: (skip)
 * @appsrc: a #GstAppSrc
 * @callbacks: the callbacks
 * @user_data: a user_data argument for the callbacks
 * @notify: a destroy notify function
 *
 * Set callbacks which will be executed when data is needed, enough data has
 * been collected or when a seek should be performed.
 * This is an alternative to using the signals, it has lower overhead and is thus
 * less expensive, but also less flexible.
 *
 * If callbacks are installed, no signals will be emitted for performance
 * reasons.
 *
 * Before 1.16.3 it was not possible to change the callbacks in a thread-safe
 * way.
 */
void
gst_app_src_set_callbacks (GstAppSrc * appsrc,
    GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
{
  Callbacks *old_callbacks, *new_callbacks = NULL;
  GstAppSrcPrivate *priv;

  g_return_if_fail (GST_IS_APP_SRC (appsrc));
  g_return_if_fail (callbacks != NULL);

  priv = appsrc->priv;

  if (callbacks) {
    new_callbacks = g_new0 (Callbacks, 1);
    new_callbacks->callbacks = *callbacks;
    new_callbacks->user_data = user_data;
    new_callbacks->destroy_notify = notify;
    new_callbacks->ref_count = 1;
  }

  g_mutex_lock (&priv->mutex);
  old_callbacks = g_steal_pointer (&priv->callbacks);
  priv->callbacks = g_steal_pointer (&new_callbacks);
  g_mutex_unlock (&priv->mutex);

  g_clear_pointer (&old_callbacks, callbacks_unref);
}

/*** GSTURIHANDLER INTERFACE *************************************************/

static GstURIType
gst_app_src_uri_get_type (GType type)
{
  return GST_URI_SRC;
}

static const gchar *const *
gst_app_src_uri_get_protocols (GType type)
{
  static const gchar *protocols[] = { "appsrc", NULL };

  return protocols;
}

static gchar *
gst_app_src_uri_get_uri (GstURIHandler * handler)
{
  GstAppSrc *appsrc = GST_APP_SRC (handler);

  return appsrc->priv->uri ? g_strdup (appsrc->priv->uri) : NULL;
}

static gboolean
gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
    GError ** error)
{
  GstAppSrc *appsrc = GST_APP_SRC (handler);

  g_free (appsrc->priv->uri);
  appsrc->priv->uri = uri ? g_strdup (uri) : NULL;

  return TRUE;
}

static void
gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
{
  GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;

  iface->get_type = gst_app_src_uri_get_type;
  iface->get_protocols = gst_app_src_uri_get_protocols;
  iface->get_uri = gst_app_src_uri_get_uri;
  iface->set_uri = gst_app_src_uri_set_uri;
}

static gboolean
gst_app_src_event (GstBaseSrc * src, GstEvent * event)
{
  GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
  GstAppSrcPrivate *priv = appsrc->priv;

  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_FLUSH_STOP:
      g_mutex_lock (&priv->mutex);
      priv->is_eos = FALSE;
      g_mutex_unlock (&priv->mutex);
      break;
    default:
      break;
  }

  return GST_BASE_SRC_CLASS (parent_class)->event (src, event);
}
